Course "Data processing and Visualization", IE500417, NTNU.
Note (as usual): plagiarism is strictly forbidden! You should never copy any source code from other students. If you use any code written by others (except the standards libraries: NumPy, SciPy, Pandas, etc), provide a reference.
If the teachers see that your work is mostly copy+paste from online code snippets, the grade can be reduced.
If a case of plagiarism is detected, it will be reported to the administration.
In this assignment, you will practice digital signal processing (in a rather basic form, there will be no advanced DSP methods). You will work with two signals simultaneously. As it sometimes happens, the two signals are not synchronized: they are sampled at different time moments and with different sampling rate. You will have to resample and synchronize them so that both signals have the same sample timestamps. You will do some analysis of the signals and visualize them using line charts.
The assignment must be handed in on Blackboard. The following must be handed in:
Deadlines and grading information on Blackboard.
import pandas as pd
import numpy as np
from pathlib import Path
import math
import warnings
import os
from datetime import datetime, timedelta
import plotly.express as px
import plotly.graph_objects as go
from scipy.fft import fft, fftfreq
from scipy.signal import resample
from scipy.interpolate import interp1d
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
DATA_DIR = Path("../data")
#unity functions
# Plots freq domain of graph
def plot_freq(amp, freq, threshold):
threshold_line = np.copy(freq)
threshold_line.fill(threshold)
fig = go.Figure([
go.Scatter(x=freq, y=amp, line=go.scatter.Line(dash="solid", width=1, color="pink"), name="Signal"),
go.Scatter(x=freq, y=threshold_line, line=go.scatter.Line(dash="dot", width=1, color="green"), name="Threshold"),
])
fig.layout.template = "plotly_dark"
fig.update_layout(
title = dict(
font = dict(
size = 20
),
text = "Frequency Domain"
),
height = 600,
yaxis_title="Amplitude",
xaxis_title="Frequency"
)
fig.update_xaxes(
tickangle = -45,
)
fig.update_layout(title_x=0.5)
fig.update_xaxes(range=[0, 20])
fig.update_yaxes(range=[0, 0.5])
fig.show()
# Plots original and resampled data
def plot_signals(signal, org_signal):
fig = go.Figure([
go.Scatter(x=org_signal['time'], y=org_signal['value'], line=go.scatter.Line(dash="solid", width=1, color="pink"), name="Original"),
go.Scatter(x=signal['time'], y=signal['value'], line=go.scatter.Line(dash="dot", width=2, color="green"), name="Resampled")
])
fig.layout.template = "plotly_dark"
fig.update_layout(
title = dict(
font = dict(
size = 20
),
text = "Signal Values",
),
height = 600,
yaxis_title="Value",
xaxis_title="Sample time"
)
fig.update_layout(title_x=0.5)
fig.update_xaxes(
tickangle = -45,
)
fig.show()
Step 1.1: Load the two signals from CSV files: s1.csv and s2.csv.
# Your code here
sig_1 = pd.read_csv(DATA_DIR / "s1.csv")
sig_2 = pd.read_csv(DATA_DIR / "s2.csv")
Step 1.2: Do a quick analysis of what data you got: column names of each signal and number of rows.
# Your code here
sig_1 = sig_1.rename(columns={"s1": "value"}) #renaming it to value for consistency
sig_1.head()
| time | value | |
|---|---|---|
| 0 | 2017-08-29 10:30:00.000 | 15.12 |
| 1 | 2017-08-29 10:30:00.010 | 15.01 |
| 2 | 2017-08-29 10:30:00.020 | 14.51 |
| 3 | 2017-08-29 10:30:00.030 | 14.94 |
| 4 | 2017-08-29 10:30:00.040 | 14.96 |
sig_1.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 10000 entries, 0 to 9999 Data columns (total 2 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 time 10000 non-null object 1 value 10000 non-null float64 dtypes: float64(1), object(1) memory usage: 156.4+ KB
sig_1.describe()
| value | |
|---|---|
| count | 10000.000000 |
| mean | 82.165242 |
| std | 38.120285 |
| min | 13.420000 |
| 25% | 49.183750 |
| 50% | 76.800000 |
| 75% | 106.825000 |
| max | 201.240000 |
sig_2 = sig_2.rename(columns={"s2": "value"}) #renaming for consistency
sig_2.head()
| time | value | |
|---|---|---|
| 0 | 2017-08-29 10:30:00.000 | 241.17 |
| 1 | 2017-08-29 10:30:00.190 | 238.04 |
| 2 | 2017-08-29 10:30:00.400 | 239.95 |
| 3 | 2017-08-29 10:30:00.520 | 242.19 |
| 4 | 2017-08-29 10:30:00.700 | 244.66 |
sig_2.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 810 entries, 0 to 809 Data columns (total 2 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 time 810 non-null object 1 value 810 non-null float64 dtypes: float64(1), object(1) memory usage: 12.8+ KB
sig_2.describe()
| value | |
|---|---|
| count | 810.000000 |
| mean | 254.646198 |
| std | 32.345325 |
| min | 182.840000 |
| 25% | 227.762500 |
| 50% | 256.930000 |
| 75% | 282.846250 |
| max | 312.000000 |
Step 1.3: One of the signals is sampled at even frequency, another is not. Find out which is the nonuniformly
sampled signal. Store it in variable signal_x. Store the the uniformly sampled signal in variable signal_u.
Note: "find out" here means "write code that finds out". If you will manually assign the signal_u and signal_x variables, you won't get points for this step. The reason - manual assignment is not flexible. If the dataset changes, your remaining notebook calculations will be wrong suddenly. Flexible code that finds the necessary signals would work even if we would swap the s1.csv and s2.csv files.
# Your code here
def to_datetime(datatime_str):
return datetime.strptime(datatime_str, '%Y-%m-%d %H:%M:%S.%f')
def is_sampling_freq_even(sig):
num_of_samples = sig["time"].shape[0]
assert num_of_samples >= 3, "Need at least 3 points to check"
time_df = sig["time"].copy().apply(to_datetime)
time_delta = time_df.diff(1)[1:]
return (time_delta[1] == time_delta).all()
signal_x = sig_2 if is_sampling_freq_even(sig_1) else sig_1
signal_u = sig_2 if signal_x is not sig_2 else sig_1
# Based on the above question
print(f"{'Signal 1' if signal_u is sig_1 else 'Signal 2'} is a unifrom signal")
print(f"{'Signal 1' if signal_x is sig_1 else 'Signal 2'} is a non-unifrom signal")
Signal 1 is a unifrom signal Signal 2 is a non-unifrom signal
Step 1.4. Plot the two signals in a line chart:
# Your code here
signal_X_converted = signal_x.copy()
signal_X_converted["time"] = signal_X_converted["time"].apply(to_datetime)
signal_U_converted = signal_u.copy()
signal_U_converted["time"] = signal_U_converted["time"].apply(to_datetime)
fig = go.Figure([
go.Scatter(x=signal_U_converted['time'], y=signal_U_converted['value'], line=go.scatter.Line(dash="dash", width=2, color="green"), name="Signal U"),
go.Scatter(x=signal_X_converted['time'], y=signal_X_converted['value'], line=go.scatter.Line(dash="solid", width=1, color="blue"), name="Signal X")
])
fig.layout.template = "plotly_dark"
fig.update_layout(
title = dict(
font = dict(
size = 20
),
text = "Signal Values"
),
height = 600,
yaxis_title="Value",
xaxis_title="Sample time"
)
fig.update_layout(title_x=0.5)
fig.update_xaxes(
tickangle = -45,
)
fig.show()
A reference showing approximately how it could look:
Step 1.5: Find out the sampling frequency of Signal U, save it in variable f_u.
# Your code here
time_per_sample = (signal_U_converted['time'][1] - signal_U_converted['time'][0]).total_seconds()
f_u = 1/ time_per_sample
print(f"The sampling frequency of Signal U: {f_u} Hz")
The sampling frequency of Signal U: 100.0 Hz
Step 1.6: Find out which are the highest frequencies used in Signal U. Save the highest frequency in variable b_u, having Hz as units.
Hint: use Fourier transform, and find max frequency having a significant amplitude. There may be many frequencies with tiny amplitudes. Use some threshold to filter these out.
# Your code here
num_samples = signal_U_converted['value'].shape[0]
np_array = signal_U_converted['value'].to_numpy()
# Amplitude threshold
AMPLITUDE_THRESHOLD = 0.001
# Amplitude (ignoring the complex numbers)
yf = np.abs(fft(np_array, norm="forward").real)
# Frequency
xf = fftfreq(num_samples, time_per_sample)
max_f = float('-inf')
for idx in range(0, len(xf)):
if(yf[idx] < AMPLITUDE_THRESHOLD):
continue
else:
max_f = max( max_f, xf[idx] )
b_u = max_f
print(f"The highest constituent frequncy in Signal U: {b_u} Hz")
plot_freq(yf, xf, threshold=AMPLITUDE_THRESHOLD)
The highest constituent frequncy in Signal U: 49.980000000000004 Hz
Step 1.7: Find out the minimum frequency at which Signal U should have been sampled to still contain all the information in the signal. Save it in variable fs_u.
Hint: Nyquist-Shannon theorem
# Your code here
# The theory says the sampling freq should be more than twice the highest freq in the signal
fs_u = math.ceil(2 * b_u) # rounding it up since the freq needs to be equal or higher
print(f"The optimal sampling rate required for Signal U: {fs_u} Hz")
The optimal sampling rate required for Signal U: 100 Hz
Step 1.8: Calculate, how many % of space is wasted by storing too many samples for Signal U. I.e., if we would resample in the Signal U at a sampling rate fs_u, how many samples would we store, and how much that is in relation to the number of samples in the CSV file?
If it is 0, why?
P.S. Don't worry about Signal X – the sampling system for it was designed by careless engineers who did not know about Nyquist-Shannon's sampling theorem. Therefore, the sampling of Signal X is not proper. But we work with what we have.
# Your code here
num_samples = signal_U_converted['value'].shape[0]
time_duration_in_seconds = (signal_U_converted['time'].max() - signal_U_converted['time'].min()).total_seconds()
optimal_num_samples = round(time_duration_in_seconds * fs_u) # rounding it here since partial samples make no sense
print(f"Space wasted: {round((num_samples - optimal_num_samples) * 100 / num_samples)}%")
Space wasted: 0%
--- YOUR ANSWER HERE ---
Since our amplitude threshold dictates the highest consituent frequency and thus the optimal frequncy according to Nyquist-Shannon theorem. A 0.001 threshold yeilds an optimal sampling frequcny of 100Hz. This multiplied with the time duration of the signal gives us 10,000 points which is equal to the original number of points in the signal. Hence we don't have any waste of space in the original sampling.
Note: whenever you modify something for the signals, it is suggested to store the modifierd signal in another variable. Keep the original one intact. You may later want to compare the two.
Step 2.1: Decimate (down-sample) the Signal U to 10Hz, store it in variable su_resampled:
# Your code here
num_samples_in_10hz = round(time_duration_in_seconds * 10)
su_resampled_intermediate = resample(signal_U_converted['value'], num_samples_in_10hz, signal_U_converted['time'])
su_resampled = pd.DataFrame(data={"value": su_resampled_intermediate[0], "time": su_resampled_intermediate[1]})
# Plotting resampled signal
plot_signals(su_resampled, signal_U_converted)
Step 2.2: Explain - why is the resampled signal not containing the same information as the original signal (i.e., what information is lost?)
--- YOUR ANSWER HERE ---
The resampled signal loses information due to the fact that we are using 10Hz to resample it which is much lower than the optimal frequency of 100Hz as computed before. The information we lose are the high frequncy contituent signals due to the fact that our lower sampling rate now will not be able to capture them.
From now on, in all the places whare you need to do something with Signal U, use the resampled version: su_resampled.
Step 2.3: Synchronize Signal X with Signal U, store it in variable sx_resampled. I.e., if su_resampled is sampled at time moments t0, t1, …, tN, then resample Signal X at the same time moments: t0, t1, …, tN. This may involve several steps,
depending on what functions/libraries you use.
Hint: see 07-2-Resampling.ipynb example notebook on Blackboard ().
Hint: your resulting sx_resampled should be 10Hz signal, not 100Hz.
# Your code here
# I am upsampling the signal to get better output during interpolation with respect to the reference signal, ie. Signal U
upsampled_X = (signal_X_converted.copy().set_index("time").resample('5ms').asfreq()).interpolate(method='linear').reset_index()
interpolation_fn = interp1d(upsampled_X["time"].apply(datetime.timestamp), upsampled_X["value"])
new_values = interpolation_fn(su_resampled["time"].apply(datetime.timestamp))
sx_resampled = pd.DataFrame({"value": new_values, "time": su_resampled["time"]})
# Plotting the graph
plot_signals(sx_resampled, signal_X_converted)
Step 2.4: Check if the two signals really are synchronized – compare the timestamps, these should be equal.
# Your code here
assert sx_resampled["time"].equals(su_resampled["time"])
Step 2.5: Take both signals and insert them into a single DataFrame object (name it composed_data) which has:
signalX and signalU containing the corresponding values (sx_resampled and su_resampled).# Your code here
composed_data = pd.DataFrame({"time": sx_resampled["time"], "signalX": sx_resampled["value"], "signalU": su_resampled["value"] }).set_index("time")
composed_data
| signalX | signalU | |
|---|---|---|
| time | ||
| 2017-08-29 10:30:00.000 | 241.170000 | 84.916675 |
| 2017-08-29 10:30:00.100 | 239.522632 | 0.184510 |
| 2017-08-29 10:30:00.200 | 238.130952 | 22.706012 |
| 2017-08-29 10:30:00.300 | 239.040476 | 12.151147 |
| 2017-08-29 10:30:00.400 | 239.950000 | 19.746830 |
| ... | ... | ... |
| 2017-08-29 10:31:39.500 | 257.840000 | 152.423106 |
| 2017-08-29 10:31:39.600 | 257.491667 | 149.991113 |
| 2017-08-29 10:31:39.700 | 258.273529 | 161.480336 |
| 2017-08-29 10:31:39.800 | 260.720588 | 141.190984 |
| 2017-08-29 10:31:39.900 | 262.680769 | 167.268946 |
1000 rows × 2 columns
In this part you will find extreme values in the signals. Typically, these could mean outliers, sampling errors or extreme modes of operation in the system (such as overheating of a motor).
Step 3.1: Find Signal U values (su_resampled) above 170.0. Store those in variable extreme_u_vals.
# Your code here
extreme_u_vals = su_resampled["value"][su_resampled["value"] > 170].to_numpy()
print(f"There are {extreme_u_vals.shape[0]} outliers")
There are 18 outliers
Step 3.2: Find Signal X values (sx_resampled) outside the range mean ± 2* StdDev. Store those in variable ex_static_x_vals.
# Your code here
mean = sx_resampled["value"].mean()
stdDev = sx_resampled["value"].std()
ex_static_x_vals = sx_resampled["value"][ (sx_resampled["value"] > (mean + 2 * stdDev)) | (sx_resampled["value"] < (mean - 2 * stdDev)) ].to_numpy()
print(f"There are {ex_static_x_vals.shape[0]} outliers")
There are 7 outliers
Step 3.3: Find Signal X values (sx_resampled) outside an adaptive range: 3-second moving average ± 2 * StdDev. Both the average and StdDev are calculated in a 3-second rolling window. Store the values in variable ex_dynamic_x_vals.
# Your code here
def find_init_window_idxs(signal, window_time):
window_start_idx = 0
window_stop_idx = sx_resampled["time"].shape[0]
time_stop = sx_resampled["time"][0] + timedelta(seconds=window_time)
for idx in range(window_start_idx, window_stop_idx):
if sx_resampled["time"][idx] >= time_stop:
window_stop_idx = idx
break
return (window_start_idx, window_stop_idx)
# returns a tuple where 1st item is mean and 2nd is std
def compute_stats(data_window):
return (data_window.mean(), data_window.std())
def rolling_stat_generator(signal, TIME_WINDOW_in_S):
(window_start_idx, window_stop_idx) = find_init_window_idxs(signal, TIME_WINDOW_in_S)
signal = signal.copy()
signal["rolling_mean"] = None
signal["rolling_std"] = None
signal["is_outlier"] = False
while window_stop_idx < signal["time"].shape[0]:
# we cannot check for signals before running average starts
(mean, std) = compute_stats(signal["value"][window_start_idx:window_stop_idx + 1])
signal["rolling_mean"][window_stop_idx] = mean
signal["rolling_std"][window_stop_idx] = std
if signal["value"][window_stop_idx] > mean + 2 * std or signal["value"][window_stop_idx] < mean - 2 * std:
signal["is_outlier"][window_stop_idx] = True
window_start_idx += 1
window_stop_idx += 1
return signal
TIME_WINDOW_in_S = 3
ex_dynamic_x = rolling_stat_generator(sx_resampled, TIME_WINDOW_in_S)
ex_dynamic_x_vals = ex_dynamic_x[ex_dynamic_x["is_outlier"]]["value"].to_numpy()
print(f"There are {ex_dynamic_x_vals.shape[0]} outliers")
There are 155 outliers
Step 4.1: Plot a line chart with values, rolling mean, normal boundaries (mean +/- 2StdDev) and the extreme values ex_dynamic_x_vals that you calculated in Part 3. Example:
# Your code here
def plot_outliers(signal):
signal = signal.copy()
signal["M + 2S"] = signal["rolling_mean"] + 2 * signal["rolling_std"]
signal["M - 2S"] = signal["rolling_mean"] - 2 * signal["rolling_std"]
outliers = signal[signal["is_outlier"]]
fig = go.Figure([
go.Scatter(x=signal['time'], y=signal['value'], line=go.scatter.Line(dash="solid", width=1, color="blue"), name="Signal"),
go.Scatter(x=signal['time'], y=signal['rolling_mean'], line=go.scatter.Line(dash="solid", width=1, color="yellow"), name="Rolling Mean"),
go.Scatter(x=signal['time'], y=signal['M + 2S'], line=go.scatter.Line(dash="dot", width=1, color="green"), name="Rolling Mean + 2(STD)"),
go.Scatter(x=signal['time'], y=signal['M - 2S'], line=go.scatter.Line(dash="dot", width=1, color="green"), name="Rolling Mean - 2(STD)"),
go.Scatter(x=outliers['time'], y=outliers['value'], line=go.scatter.Line(dash="dot", width=1, color="red"), mode="markers", name="Outliers")
])
fig.layout.template = "plotly_dark"
fig.update_layout(
title = dict(
font = dict(
size = 20
),
text = "Outliers"
),
height = 600,
yaxis_title="Value",
xaxis_title="Sample time"
)
fig.update_xaxes(
tickangle = -45,
)
fig.update_layout(title_x=0.5)
fig.show()
plot_outliers(ex_dynamic_x)
Reference Image
Step 4.2: Find segments in Signal X (sx_resampled) where the 10-second moving average value is increasing for a continuous period of at least two seconds.
# Your code here
def check_positive_monotone_over_window(signal, time_in_s):
(window_start_idx, window_stop_idx) = find_init_window_idxs(signal, time_in_s)
signal = signal.copy()
n_key = f"is_increasing_{time_in_s}"
signal[n_key] = False
while window_stop_idx < sx_resampled["time"].shape[0]:
# If we don't have a mean to compute on, then we continue to next window
if(signal["rolling_mean"][window_start_idx] is None):
window_start_idx += 1
window_stop_idx += 1
continue
# check if mean diff is all positive within this window
if signal["is_increasing"][window_start_idx:window_stop_idx + 1].all():
signal[n_key][window_start_idx:window_stop_idx + 1] = True
window_start_idx += 1
window_stop_idx += 1
return (signal, n_key)
TIME_WINDOW_in_S = 10
ex_dynamic_x_10 = rolling_stat_generator(sx_resampled, TIME_WINDOW_in_S)
# Find arr[x] = arr[x] - arr[x-1] (If the current value at idx has increased compared to idx-1)
ex_dynamic_x_10["is_increasing"] = ex_dynamic_x_10["rolling_mean"].fillna(0).diff() > 0
TIME_WINDOW_in_S_mono = 2
(ex_dynamic_x_10_2, n_key) = check_positive_monotone_over_window(ex_dynamic_x_10, TIME_WINDOW_in_S_mono)
ex_dynamic_x_10_2_vals = ex_dynamic_x_10_2[ex_dynamic_x_10_2[n_key]]["value"].to_numpy()
Step 4.3: Plot a line chart and mark these regions (from previous step) in a different color.
For example: show the normal line in blue color and the continuously increasing moving average segments in green. Example:
# Your code here
def signal_splitter(signal, n_key):
result = []
init_idx = 0
final_idx = 0
is_tracking = False
while final_idx < signal.shape[0]:
if signal[n_key][init_idx] == True:
is_tracking = True
else:
init_idx += 1
final_idx += 1
if is_tracking == True:
if (final_idx + 1) == signal.shape[0] or signal[n_key][final_idx + 1] == False:
result.append(signal[init_idx: final_idx + 1])
is_tracking = False
final_idx += 1
init_idx = final_idx
else:
final_idx += 1
return result
def plot_positive_monotone(signal, n_key):
signal = signal.copy()
increasing_segments = signal[signal[n_key]]
trace_array = [go.Scatter(x=signal['time'], y=signal['value'], line=go.scatter.Line(dash="solid", width=1, color="blue"), name="Signal"),]
split_sig_arr = signal_splitter(signal, n_key)
for data in split_sig_arr:
trace_array.append(
go.Scatter(
mode="lines+markers",
x=data['time'],
y=data['rolling_mean'],
line=go.scatter.Line(dash="solid", width=1, color="green"),
showlegend=False
))
fig = go.Figure(trace_array)
fig.layout.template = "plotly_dark"
fig.update_layout(
title = dict(
font = dict(
size = 20
),
text = "2s Monotonic Increasing Segments"
),
height = 600,
yaxis_title="Value",
xaxis_title="Sample time"
)
fig.update_layout(title_x=0.5)
fig.update_xaxes(
tickangle = -45,
)
fig.show()
plot_positive_monotone(ex_dynamic_x_10_2, n_key)
Reference Image
Please reflect on the following questions:
A. Yeah it was fine. Not too tough, nor too easy.
2. How many hours did you spend on it?
A. 4hr
3. What was the most time-consuming part?
A. Part 2, 3, 4
4. If you need to do similar things later in your professional life, how can you improve? How can you do it more efficiently?
A. Having a list of helper functions as a toolkit might be useful
5. Was there something you would expect to learn that this exercise did not include?
A. No, it had everything I was hoping to learn
6. Was there something that does not make sense?
A. The sampling frequency in 1.6